package cn.doitedu.etl;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/14
 * @Tips: 学大数据，到多易教育
 * @Desc:
 *   视频播放数据分析轻度聚合
 **/
public class E05_EtlJob_VideoAgg01 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 用kafka连接器，创建输入数据流（事件公共维度打宽数据）
        tEnv.executeSql(
                " CREATE TABLE mall_events_commondim_kafkasource(          "
                        + "     user_id           INT,                            "
                        + "     event_id          string,                         "
                        + "     event_time        bigint,                         "
                        + "     release_channel   string,                         "
                        + "     device_type       string,                         "
                        + "     properties        map<string,string>,            "
                        + "     proc_time   AS PROCTIME()                       " // lookup join中，必须声明process time字段
                        + " ) WITH (                                            "
                        + "  'connector' = 'kafka',                             "
                        + "  'topic' = 'mall-evts-comdim-w',                    "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',   "
                        + "  'properties.group.id' = 'gp01',                    "
                        + "  'scan.startup.mode' = 'latest-offset',             "
                        + "  'value.format'='json',                             "
                        + "  'value.json.fail-on-missing-field'='false',        "
                        + "  'value.fields-include' = 'EXCEPT_KEY')             ");


        // 创建hbase连接器表： 视频内容信息维表
        tEnv.executeSql("CREATE TABLE dim_video_info_hbasesource( " +
                " id INT, " +
                " f ROW<video_type STRING, video_album STRING, video_author STRING,video_timelong BIGINT, create_time TIMESTAMP(3)>, " +
                " PRIMARY KEY (id) NOT ENFORCED " +
                ") WITH (                             " +
                " 'connector' = 'hbase-2.2',          " +
                " 'table-name' = 'dim_video_info',     " +
                " 'zookeeper.quorum' = 'doitedu:2181' " +
                ")");

        // 使用lookup方式，做维度打宽
        tEnv.executeSql(
                " CREATE TEMPORARY VIEW joined_view AS                                                             "
                        + " SELECT                                                                                           "
                        + " user_id,event_id,event_time,release_channel,                                                     "
                        + " device_type,video_id,video_play_id,                                                              "
                        + " video_type,video_album,video_author,video_timelong,create_time                                   "
                        + " FROM                                                                                             "
                        + " (                                                                                                "
                        + "  SELECT                                                                                          "
                        + "    user_id,event_id,event_time,release_channel,                                                  "
                        + "    device_type,cast(properties['video_id'] as INT) as video_id,                                  "
                        + "    properties['video_play_id'] as video_play_id,                                                 "
                        + "    proc_time                                                                                     "
                        + "  FROM mall_events_commondim_kafkasource                                                          "
                        + "  WHERE event_id in ('video_play','video_beat','vide_pause','video_resume','video_stop')          "
                        + " ) v                                                                                              "
                        + " LEFT JOIN dim_video_info_hbasesource FOR SYSTEM_TIME AS OF v.proc_time AS d ON v.video_id = d.id "
        );


        // 核心处理逻辑，用datastream api 更方便
        DataStream<VideoInfoBean> beans = tEnv.toDataStream(tEnv.from("joined_view"), VideoInfoBean.class);

        SingleOutputStreamOperator<VideoAggBean> resDs = beans.keyBy(bean -> bean.getUser_id())
                .process(new KeyedProcessFunction<Integer, VideoInfoBean, VideoAggBean>() {
                    MapState<String, VideoAggBean> aggState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 这里之所以设计mapstate
                        // 可以植入一个面试点： 又遇到过什么问题吗？
                        // 嗯，这里就是
                        aggState = getRuntimeContext().getMapState(new MapStateDescriptor<String, VideoAggBean>("aggBean", String.class, VideoAggBean.class));
                    }

                    @Override
                    public void processElement(VideoInfoBean bean, KeyedProcessFunction<Integer, VideoInfoBean, VideoAggBean>.Context ctx, Collector<VideoAggBean> out) throws Exception {
                        VideoAggBean stateBean = aggState.get(bean.getUser_id() + bean.getVideo_play_id());
                        if (stateBean == null) {
                            stateBean = new VideoAggBean();

                            stateBean.setUser_id(bean.getUser_id());
                            stateBean.setRelease_channel(bean.getRelease_channel());
                            stateBean.setDevice_type(bean.getDevice_type());
                            stateBean.setVideo_id(bean.getVideo_id());
                            stateBean.setVideo_play_id(bean.getVideo_play_id());
                            stateBean.setVideo_type(bean.getVideo_type());
                            stateBean.setVideo_album(bean.getVideo_album());
                            stateBean.setVideo_author(bean.getVideo_author());
                            stateBean.setVideo_timelong(bean.getVideo_timelong());
                            stateBean.setCreate_time(bean.getCreate_time());
                            stateBean.setPlay_start_time(bean.getEvent_time());
                            stateBean.setPlay_end_time(bean.getEvent_time());

                            aggState.put(bean.getUser_id() + bean.getVideo_play_id(), stateBean);

                        }

                        // 遇到 play_resume 事件，则更新 start_time=event_time 和 end_time=event_time属性，并输出
                        // 这样，在doris中就会新起一行来记录
                        if (bean.getEvent_id().equals("video_resume")) {
                            stateBean.setPlay_start_time(bean.event_time);
                        }

                        // 其他事件，直接更新 end_time=event_time 属性，并输出
                        stateBean.setPlay_end_time(bean.getEvent_time());

                        out.collect(stateBean);

                    }
                });


        // 将输出数据转成 临时视图
        tEnv.createTemporaryView("res",resDs);

        // 创建doris连接器sink表：
        tEnv.executeSql(
                " create table video_agg_01_dorissink(    "
                        + "     start_dt         DATE,         "
                        + "     user_id          INT,          "
                        + "     release_channel  VARCHAR(20),  "
                        + "     device_type      VARCHAR(20),  "
                        + "     video_id         INT ,         "
                        + "     video_play_id    VARCHAR(20),  "
                        + "     video_type       VARCHAR(20),  "
                        + "     video_album      VARCHAR(40),  "
                        + "     video_author     VARCHAR(40),  "
                        + "     video_timelong   BIGINT,       "
                        + "     create_time      TIMESTAMP,    "
                        + "     play_start_time  BIGINT ,      "
                        + "     play_end_time    BIGINT        "
                        + " ) WITH (                               "
                        + "    'connector' = 'doris',              "
                        + "    'fenodes' = 'doitedu:8030',         "
                        + "    'table.identifier' = 'dws.video_agg_01',  "
                        + "    'username' = 'root',                "
                        + "    'password' = '',                    "
                        + "    'sink.label-prefix' = 'doris_tl" + System.currentTimeMillis() + "')"
        );


        // 将数据插入doris表
        tEnv.executeSql("INSERT INTO video_agg_01_dorissink  " +
                "SELECT TO_DATE(date_format(to_timestamp_ltz(play_start_time,3),'yyyy-MM-dd')) as start_dt," +
                "user_id,release_channel,device_type,video_id,video_play_id,video_type," +
                "video_album,video_author,video_timelong,create_time,play_start_time,play_end_time from res").print();

        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class VideoInfoBean {
        Integer user_id;
        String event_id;
        Long event_time;
        String release_channel;
        String device_type;
        Integer video_id;
        String video_play_id;
        String video_type;
        String video_album;
        String video_author;
        Long video_timelong;
        Timestamp create_time;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class VideoAggBean {
        Integer user_id;
        String release_channel;
        String device_type;
        Integer video_id;
        String video_play_id;
        String video_type;
        String video_album;
        String video_author;
        Long video_timelong;
        Timestamp create_time;
        Long play_start_time;
        Long play_end_time;
    }


}
